-
Notifications
You must be signed in to change notification settings - Fork 598
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(stream): support set nats consumer deliver policy as latest, earliest, by sequence, by timestamp #12176
Conversation
let consumer = properties | ||
.common | ||
.build_consumer(0, splits[0].start_sequence) | ||
.build_consumer(0, start_position.clone()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is a bad practice to hardcode the split_id 0
everywhere in the code. Here you should get the split id from the input split. And I suggest that you can save the split id to the SplitReader
when you create the reader. Then you can use the split_id to build a SourceMessage
instead of hardcoding it in the from_nats_jetstream_message
.
Btw you may impl From<NatsMessage> for SourceMessage
to do the conversion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for you remind. I already applied your suggestions. :)
Timestamp(i128), | ||
None, | ||
} | ||
|
||
/// The states of a NATS split, which will be persisted to checkpoint. | ||
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)] | ||
pub struct NatsSplit { | ||
pub(crate) subject: String, | ||
// TODO: to simplify the logic, return 1 split for first version. May use parallelism in | ||
// future. | ||
pub(crate) split_num: i32, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
may refactor to split_id
src/connector/Cargo.toml
Outdated
@@ -41,7 +41,9 @@ chrono = { version = "0.4", default-features = false, features = [ | |||
"clock", | |||
"std", | |||
] } | |||
clickhouse = { git = "https://github.com/risingwavelabs/clickhouse.rs", rev = "622501c1c98c80baaf578c716d6903dde947804e", features = ["time"] } | |||
clickhouse = { git = "https://github.com/risingwavelabs/clickhouse.rs", rev = "622501c1c98c80baaf578c716d6903dde947804e", features = [ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems unintended modifications to this file.
DeliverPolicy::ByStartSequence { | ||
start_sequence: 1 + parsed, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the behavior is not consistent with other connectors, starts with seq
should contain seq
th message
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one actually try to fix the problem that when resume.
If we use by sequence policy, this one may cause problem (we need to manuelly minus 1 for input sequence number). But currently I apply your suggestion and remove the by sequence
policy. Then this would be fine.
it is kinda easy to misuse seq num, let remove relate to #12241 |
Codecov Report
@@ Coverage Diff @@
## yufan/nats-connect #12176 +/- ##
======================================================
- Coverage 69.75% 69.74% -0.02%
======================================================
Files 1407 1407
Lines 235565 235596 +31
======================================================
- Hits 164314 164309 -5
- Misses 71251 71287 +36
Flags with carried forward coverage won't be shown. Click here to find out more.
... and 2 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
merge into #12227
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
This PR try to add several deliver policies (latest, earliest,
by sequence, by timestamp).Test the policy under
We decide to merge it into the #12227 firstly. Will use that branch to build a image for user.
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.
Add the
latest
,earliest
,by timestamp
policy. Currently decide not supportby sequence
policydefault is earliest
latest policy, need to set
scan.startup.mode='latest'
earliest policy, need to set
scan.startup.mode='earliest',
// by timestamp millis policy, need to set
scan.startup.mode='timestamp_millis'
, and also setnats.scan.startup.timestamp_millis